# -*- coding: utf-8 -*-
"""
Functions which implement running reactor jobs
"""


# Import python libs
from __future__ import absolute_import, print_function, unicode_literals

import fnmatch
import glob
import logging

# Import salt libs
import salt.client
import salt.defaults.exitcodes
import salt.runner
import salt.state
import salt.utils.args
import salt.utils.cache
import salt.utils.data
import salt.utils.event
import salt.utils.files
import salt.utils.process
import salt.utils.yaml
import salt.wheel

# Import 3rd-party libs
from salt.ext import six

log = logging.getLogger(__name__)

REACTOR_INTERNAL_KEYWORDS = frozenset(
    ["__id__", "__sls__", "name", "order", "fun", "state"]
)


class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
    """
    Read in the reactor configuration variable and compare it to events
    processed on the master.
    The reactor has the capability to execute pre-programmed executions
    as reactions to events
    """

    aliases = {
        "cmd": "local",
    }

    def __init__(self, opts, **kwargs):
        super(Reactor, self).__init__(**kwargs)
        local_minion_opts = opts.copy()
        local_minion_opts["file_client"] = "local"
        self.minion = salt.minion.MasterMinion(local_minion_opts)
        salt.state.Compiler.__init__(self, opts, self.minion.rend)

    # We need __setstate__ and __getstate__ to avoid pickling errors since
    # 'self.rend' (from salt.state.Compiler) contains a function reference
    # which is not picklable.
    # These methods are only used when pickling so will not be used on
    # non-Windows platforms.
    def __setstate__(self, state):
        Reactor.__init__(
            self,
            state["opts"],
            log_queue=state["log_queue"],
            log_queue_level=state["log_queue_level"],
        )

    def __getstate__(self):
        return {
            "opts": self.opts,
            "log_queue": self.log_queue,
            "log_queue_level": self.log_queue_level,
        }

    def render_reaction(self, glob_ref, tag, data):
        """
        Execute the render system against a single reaction file and return
        the data structure
        """
        react = {}

        if glob_ref.startswith("salt://"):
            glob_ref = self.minion.functions["cp.cache_file"](glob_ref) or ""
        globbed_ref = glob.glob(glob_ref)
        if not globbed_ref:
            log.error(
                "Can not render SLS %s for tag %s. File missing or not found.",
                glob_ref,
                tag,
            )
        for fn_ in globbed_ref:
            try:
                res = self.render_template(fn_, tag=tag, data=data)

                # for #20841, inject the sls name here since verify_high()
                # assumes it exists in case there are any errors
                for name in res:
                    res[name]["__sls__"] = fn_

                react.update(res)
            except Exception:  # pylint: disable=broad-except
                log.exception('Failed to render "%s": ', fn_)
        return react

    def list_reactors(self, tag):
        """
        Take in the tag from an event and return a list of the reactors to
        process
        """
        log.debug("Gathering reactors for tag %s", tag)
        reactors = []
        if isinstance(self.opts["reactor"], six.string_types):
            try:
                with salt.utils.files.fopen(self.opts["reactor"]) as fp_:
                    react_map = salt.utils.yaml.safe_load(fp_)
            except (OSError, IOError):
                log.error('Failed to read reactor map: "%s"', self.opts["reactor"])
            except Exception:  # pylint: disable=broad-except
                log.error(
                    'Failed to parse YAML in reactor map: "%s"', self.opts["reactor"]
                )
        else:
            react_map = self.opts["reactor"]
        for ropt in react_map:
            if not isinstance(ropt, dict):
                continue
            if len(ropt) != 1:
                continue
            key = next(six.iterkeys(ropt))
            val = ropt[key]
            if fnmatch.fnmatch(tag, key):
                if isinstance(val, six.string_types):
                    reactors.append(val)
                elif isinstance(val, list):
                    reactors.extend(val)
        return reactors

    def list_all(self):
        """
        Return a list of the reactors
        """
        if isinstance(self.minion.opts["reactor"], six.string_types):
            log.debug("Reading reactors from yaml %s", self.opts["reactor"])
            try:
                with salt.utils.files.fopen(self.opts["reactor"]) as fp_:
                    react_map = salt.utils.yaml.safe_load(fp_)
            except (OSError, IOError):
                log.error('Failed to read reactor map: "%s"', self.opts["reactor"])
            except Exception:  # pylint: disable=broad-except
                log.error(
                    'Failed to parse YAML in reactor map: "%s"', self.opts["reactor"]
                )
        else:
            log.debug("Not reading reactors from yaml")
            react_map = self.minion.opts["reactor"]
        return react_map

    def add_reactor(self, tag, reaction):
        """
        Add a reactor
        """
        reactors = self.list_all()
        for reactor in reactors:
            _tag = next(six.iterkeys(reactor))
            if _tag == tag:
                return {"status": False, "comment": "Reactor already exists."}

        self.minion.opts["reactor"].append({tag: reaction})
        return {"status": True, "comment": "Reactor added."}

    def delete_reactor(self, tag):
        """
        Delete a reactor
        """
        reactors = self.list_all()
        for reactor in reactors:
            _tag = next(six.iterkeys(reactor))
            if _tag == tag:
                self.minion.opts["reactor"].remove(reactor)
                return {"status": True, "comment": "Reactor deleted."}

        return {"status": False, "comment": "Reactor does not exists."}

    def resolve_aliases(self, chunks):
        """
        Preserve backward compatibility by rewriting the 'state' key in the low
        chunks if it is using a legacy type.
        """
        for idx, _ in enumerate(chunks):
            new_state = self.aliases.get(chunks[idx]["state"])
            if new_state is not None:
                chunks[idx]["state"] = new_state

    def reactions(self, tag, data, reactors):
        """
        Render a list of reactor files and returns a reaction struct
        """
        log.debug("Compiling reactions for tag %s", tag)
        high = {}
        chunks = []
        try:
            for fn_ in reactors:
                high.update(self.render_reaction(fn_, tag, data))
            if high:
                errors = self.verify_high(high)
                if errors:
                    log.error(
                        "Unable to render reactions for event %s due to "
                        "errors (%s) in one or more of the sls files (%s)",
                        tag,
                        errors,
                        reactors,
                    )
                    return []  # We'll return nothing since there was an error
                chunks = self.order_chunks(self.compile_high_data(high))
        except Exception as exc:  # pylint: disable=broad-except
            log.exception("Exception encountered while compiling reactions")

        self.resolve_aliases(chunks)
        return chunks

    def call_reactions(self, chunks):
        """
        Execute the reaction state
        """
        for chunk in chunks:
            self.wrap.run(chunk)

    def run(self):
        """
        Enter into the server loop
        """
        salt.utils.process.appendproctitle(self.__class__.__name__)

        # instantiate some classes inside our new process
        with salt.utils.event.get_event(
            self.opts["__role"],
            self.opts["sock_dir"],
            self.opts["transport"],
            opts=self.opts,
            listen=True,
        ) as event:
            self.wrap = ReactWrap(self.opts)

            for data in event.iter_events(full=True):
                # skip all events fired by ourselves
                if data["data"].get("user") == self.wrap.event_user:
                    continue
                if data["tag"].endswith("salt/reactors/manage/add"):
                    _data = data["data"]
                    res = self.add_reactor(_data["event"], _data["reactors"])
                    event.fire_event(
                        {"reactors": self.list_all(), "result": res},
                        "salt/reactors/manage/add-complete",
                    )
                elif data["tag"].endswith("salt/reactors/manage/delete"):
                    _data = data["data"]
                    res = self.delete_reactor(_data["event"])
                    event.fire_event(
                        {"reactors": self.list_all(), "result": res},
                        "salt/reactors/manage/delete-complete",
                    )
                elif data["tag"].endswith("salt/reactors/manage/list"):
                    event.fire_event(
                        {"reactors": self.list_all()},
                        "salt/reactors/manage/list-results",
                    )
                else:
                    reactors = self.list_reactors(data["tag"])
                    if not reactors:
                        continue
                    chunks = self.reactions(data["tag"], data["data"], reactors)
                    if chunks:
                        try:
                            self.call_reactions(chunks)
                        except SystemExit:
                            log.warning("Exit ignored by reactor")


class ReactWrap(object):
    """
    Wrapper that executes low data for the Reactor System
    """

    # class-wide cache of clients
    client_cache = None
    event_user = "Reactor"

    reaction_class = {
        "local": salt.client.LocalClient,
        "runner": salt.runner.RunnerClient,
        "wheel": salt.wheel.Wheel,
        "caller": salt.client.Caller,
    }

    def __init__(self, opts):
        self.opts = opts
        if ReactWrap.client_cache is None:
            ReactWrap.client_cache = salt.utils.cache.CacheDict(
                opts["reactor_refresh_interval"]
            )

        self.pool = salt.utils.process.ThreadPool(
            self.opts["reactor_worker_threads"],  # number of workers for runner/wheel
            queue_size=self.opts["reactor_worker_hwm"],  # queue size for those workers
        )

    def populate_client_cache(self, low):
        """
        Populate the client cache with an instance of the specified type
        """
        reaction_type = low["state"]
        if reaction_type not in self.client_cache:
            log.debug("Reactor is populating %s client cache", reaction_type)
            if reaction_type in ("runner", "wheel"):
                # Reaction types that run locally on the master want the full
                # opts passed.
                self.client_cache[reaction_type] = self.reaction_class[reaction_type](
                    self.opts
                )
                # The len() function will cause the module functions to load if
                # they aren't already loaded. We want to load them so that the
                # spawned threads don't need to load them. Loading in the
                # spawned threads creates race conditions such as sometimes not
                # finding the required function because another thread is in
                # the middle of loading the functions.
                len(self.client_cache[reaction_type].functions)
            else:
                # Reactions which use remote pubs only need the conf file when
                # instantiating a client instance.
                self.client_cache[reaction_type] = self.reaction_class[reaction_type](
                    self.opts["conf_file"]
                )

    def run(self, low):
        """
        Execute a reaction by invoking the proper wrapper func
        """
        self.populate_client_cache(low)
        try:
            l_fun = getattr(self, low["state"])
        except AttributeError:
            log.error("ReactWrap is missing a wrapper function for '%s'", low["state"])

        try:
            wrap_call = salt.utils.args.format_call(l_fun, low)
            args = wrap_call.get("args", ())
            kwargs = wrap_call.get("kwargs", {})
            # TODO: Setting user doesn't seem to work for actual remote pubs
            if low["state"] in ("runner", "wheel"):
                # Update called function's low data with event user to
                # segregate events fired by reactor and avoid reaction loops
                kwargs["__user__"] = self.event_user
                # Replace ``state`` kwarg which comes from high data compiler.
                # It breaks some runner functions and seems unnecessary.
                kwargs["__state__"] = kwargs.pop("state")
                # NOTE: if any additional keys are added here, they will also
                # need to be added to filter_kwargs()

            if "args" in kwargs:
                # New configuration
                reactor_args = kwargs.pop("args")
                for item in ("arg", "kwarg"):
                    if item in low:
                        log.warning(
                            "Reactor '%s' is ignoring '%s' param %s due to "
                            "presence of 'args' param. Check the Reactor System "
                            "documentation for the correct argument format.",
                            low["__id__"],
                            item,
                            low[item],
                        )
                if (
                    low["state"] == "caller"
                    and isinstance(reactor_args, list)
                    and not salt.utils.data.is_dictlist(reactor_args)
                ):
                    # Legacy 'caller' reactors were already using the 'args'
                    # param, but only supported a list of positional arguments.
                    # If low['args'] is a list but is *not* a dictlist, then
                    # this is actually using the legacy configuration. So, put
                    # the reactor args into kwarg['arg'] so that the wrapper
                    # interprets them as positional args.
                    kwargs["arg"] = reactor_args
                    kwargs["kwarg"] = {}
                else:
                    kwargs["arg"] = ()
                    kwargs["kwarg"] = reactor_args
                if not isinstance(kwargs["kwarg"], dict):
                    kwargs["kwarg"] = salt.utils.data.repack_dictlist(kwargs["kwarg"])
                    if not kwargs["kwarg"]:
                        log.error(
                            "Reactor '%s' failed to execute %s '%s': "
                            "Incorrect argument format, check the Reactor System "
                            "documentation for the correct format.",
                            low["__id__"],
                            low["state"],
                            low["fun"],
                        )
                        return
            else:
                # Legacy configuration
                react_call = {}
                if low["state"] in ("runner", "wheel"):
                    if "arg" not in kwargs or "kwarg" not in kwargs:
                        # Runner/wheel execute on the master, so we can use
                        # format_call to get the functions args/kwargs
                        react_fun = self.client_cache[low["state"]].functions.get(
                            low["fun"]
                        )
                        if react_fun is None:
                            log.error(
                                "Reactor '%s' failed to execute %s '%s': "
                                "function not available",
                                low["__id__"],
                                low["state"],
                                low["fun"],
                            )
                            return

                        react_call = salt.utils.args.format_call(
                            react_fun, low, expected_extra_kws=REACTOR_INTERNAL_KEYWORDS
                        )

                if "arg" not in kwargs:
                    kwargs["arg"] = react_call.get("args", ())
                if "kwarg" not in kwargs:
                    kwargs["kwarg"] = react_call.get("kwargs", {})

            # Execute the wrapper with the proper args/kwargs. kwargs['arg']
            # and kwargs['kwarg'] contain the positional and keyword arguments
            # that will be passed to the client interface to execute the
            # desired runner/wheel/remote-exec/etc. function.
            l_fun(*args, **kwargs)
        except SystemExit:
            log.warning("Reactor '%s' attempted to exit. Ignored.", low["__id__"])
        except Exception:  # pylint: disable=broad-except
            log.error(
                "Reactor '%s' failed to execute %s '%s'",
                low["__id__"],
                low["state"],
                low["fun"],
                exc_info=True,
            )

    def runner(self, fun, **kwargs):
        """
        Wrap RunnerClient for executing :ref:`runner modules <all-salt.runners>`
        """
        self.pool.fire_async(self.client_cache["runner"].low, args=(fun, kwargs))

    def wheel(self, fun, **kwargs):
        """
        Wrap Wheel to enable executing :ref:`wheel modules <all-salt.wheel>`
        """
        self.pool.fire_async(self.client_cache["wheel"].low, args=(fun, kwargs))

    def local(self, fun, tgt, **kwargs):
        """
        Wrap LocalClient for running :ref:`execution modules <all-salt.modules>`
        """
        self.client_cache["local"].cmd_async(tgt, fun, **kwargs)

    def caller(self, fun, **kwargs):
        """
        Wrap LocalCaller to execute remote exec functions locally on the Minion
        """
        self.client_cache["caller"].cmd(fun, *kwargs["arg"], **kwargs["kwarg"])
